[#10280] Support Iceberg snapshot maintenance procedures via Gravitino Trino#10500
[#10280] Support Iceberg snapshot maintenance procedures via Gravitino Trino#10500laserninja wants to merge 1 commit intoapache:mainfrom
Conversation
…avitino Trino Connector Delegate Iceberg snapshot maintenance procedures (expire_snapshots, remove_orphan_files, rewrite_data_files/optimize, rewrite_manifests) from the Gravitino Trino Connector to the internal Iceberg connector. Changes: - GravitinoConnector: Add getProcedures() and getTableProcedures() - GravitinoMetadata: Add getLayoutForTableExecute(), beginTableExecute(), finishTableExecute() in base class - Version-specific metadata classes: Add getTableHandleForExecute() and executeTableExecute() with correct SPI signatures per Trino version - GravitinoPageSinkProvider: Add createPageSink for ConnectorTableExecuteHandle - Add unit tests (TestGravitinoConnectorProcedures) and integration tests Closes apache#10280
There was a problem hiding this comment.
Pull request overview
This PR adds support for delegating Iceberg snapshot/table maintenance procedures (e.g., expire_snapshots, remove_orphan_files, optimize) through the Gravitino Trino connector by wiring Trino “table execute” and procedure APIs through to the internal Iceberg connector.
Changes:
- Delegate
getProcedures()/getTableProcedures()fromGravitinoConnectorto the internal connector. - Add/bridge “table execute” lifecycle methods in
GravitinoMetadataand version-specific metadata adapters, plus aConnectorTableExecuteHandlepage sink path. - Add unit + integration test coverage for procedure/table-execute delegation.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java | Delegates procedure and table-procedure discovery to the internal connector. |
| trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java | Adds table-execute layout + begin/finish delegation and handle wrapping for source table handle. |
| trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java | Adds page sink creation overload for ConnectorTableExecuteHandle. |
| trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java | Adds table-execute handle acquisition and execution delegation for Trino 435–439. |
| trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java | Adds table-execute handle acquisition and execution delegation for Trino 440–445. |
| trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java | Adds table-execute handle acquisition and execution delegation for Trino 446–451. |
| trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java | Adds table-execute handle acquisition and execution delegation for Trino 452–468. |
| trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java | Adds table-execute handle acquisition (with access control) and execution delegation for Trino 469–472. |
| trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java | Adds table-execute handle acquisition (with access control) and execution delegation for Trino 473–478. |
| trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java | Unit tests for procedure delegation + table-execute delegation/wrapping behavior. |
| trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql | Adds SQL integration coverage for Iceberg maintenance procedures via Gravitino. |
| trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt | Expected output for the new snapshot maintenance integration test. |
| return internalMetadata.getTableHandleForExecute( | ||
| session, GravitinoHandle.unWrap(tableHandle), procedureName, executeProperties, retryMode); | ||
| } | ||
|
|
||
| @Override | ||
| public void executeTableExecute( | ||
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | ||
| internalMetadata.executeTableExecute(session, tableExecuteHandle); |
There was a problem hiding this comment.
getTableHandleForExecute returns the internal connector’s ConnectorTableExecuteHandle directly. Given the internal connector runs in a separate plugin classloader, this can fail when Trino serializes the execute handle for distributed table execute. Consider wrapping the execute handle in a Gravitino handle wrapper (and unwrapping it before delegation), consistent with how insert/merge handles are handled.
| return internalMetadata.getTableHandleForExecute( | |
| session, GravitinoHandle.unWrap(tableHandle), procedureName, executeProperties, retryMode); | |
| } | |
| @Override | |
| public void executeTableExecute( | |
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | |
| internalMetadata.executeTableExecute(session, tableExecuteHandle); | |
| Optional<ConnectorTableExecuteHandle> internalHandle = | |
| internalMetadata.getTableHandleForExecute( | |
| session, | |
| GravitinoHandle.unWrap(tableHandle), | |
| procedureName, | |
| executeProperties, | |
| retryMode); | |
| if (!internalHandle.isPresent()) { | |
| return internalHandle; | |
| } | |
| SchemaTableName tableName = getTableName(tableHandle); | |
| return internalHandle.map( | |
| handle -> | |
| new GravitinoTableExecuteHandle( | |
| tableName.getSchemaName(), tableName.getTableName(), handle)); | |
| } | |
| @Override | |
| public void executeTableExecute( | |
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | |
| internalMetadata.executeTableExecute(session, GravitinoHandle.unWrap(tableExecuteHandle)); |
| @Override | ||
| public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute( | ||
| ConnectorSession session, | ||
| ConnectorTableHandle tableHandle, | ||
| String procedureName, | ||
| Map<String, Object> executeProperties, | ||
| RetryMode retryMode) { | ||
| return internalMetadata.getTableHandleForExecute( | ||
| session, GravitinoHandle.unWrap(tableHandle), procedureName, executeProperties, retryMode); | ||
| } |
There was a problem hiding this comment.
getTableHandleForExecute returns the internal connector’s ConnectorTableExecuteHandle directly. Since internal connectors are created from separate plugin classloaders, this execute handle may not be safely serializable/deserializable by Trino for distributed execution. Wrap the execute handle in a Gravitino handle wrapper (and unwrap when delegating) to be consistent with GravitinoInsertTableHandle/GravitinoMergeTableHandle.
| return internalMetadata.getTableHandleForExecute( | ||
| session, | ||
| accessControl, | ||
| GravitinoHandle.unWrap(tableHandle), | ||
| procedureName, | ||
| executeProperties, | ||
| retryMode); | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, Long> executeTableExecute( | ||
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | ||
| return internalMetadata.executeTableExecute(session, tableExecuteHandle); |
There was a problem hiding this comment.
getTableHandleForExecute returns the internal connector’s ConnectorTableExecuteHandle directly. With internal connectors loaded from separate plugin classloaders, this execute handle may not be serializable/deserializable by Trino when running table execute across workers. Wrap the handle in a Gravitino wrapper (and unwrap it when delegating) similarly to how insert/merge handles are wrapped.
| return internalMetadata.getTableHandleForExecute( | |
| session, | |
| accessControl, | |
| GravitinoHandle.unWrap(tableHandle), | |
| procedureName, | |
| executeProperties, | |
| retryMode); | |
| } | |
| @Override | |
| public Map<String, Long> executeTableExecute( | |
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | |
| return internalMetadata.executeTableExecute(session, tableExecuteHandle); | |
| return internalMetadata | |
| .getTableHandleForExecute( | |
| session, | |
| accessControl, | |
| GravitinoHandle.unWrap(tableHandle), | |
| procedureName, | |
| executeProperties, | |
| retryMode) | |
| .map(GravitinoHandle::wrap); | |
| } | |
| @Override | |
| public Map<String, Long> executeTableExecute( | |
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | |
| return internalMetadata.executeTableExecute( | |
| session, GravitinoHandle.unWrap(tableExecuteHandle)); |
| BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle> result = | ||
| internalMetadata.beginTableExecute( | ||
| session, tableExecuteHandle, GravitinoHandle.unWrap(updatedSourceTableHandle)); | ||
| SchemaTableName tableName = getTableName(updatedSourceTableHandle); | ||
| return new BeginTableExecuteResult<>( | ||
| result.getTableExecuteHandle(), | ||
| new GravitinoTableHandle( | ||
| tableName.getSchemaName(), tableName.getTableName(), result.getSourceHandle())); | ||
| } |
There was a problem hiding this comment.
ConnectorTableExecuteHandle is being passed through and returned as the internal connector’s handle (e.g., result.getTableExecuteHandle()), unlike other handle types (table/insert/merge/partitioning) that are wrapped for cross-plugin-classloader serialization. This can break table procedure execution in distributed mode when Trino needs to serialize the execute handle. Introduce a Gravitino wrapper for ConnectorTableExecuteHandle (similar to GravitinoInsertTableHandle) and unwrap it before delegating to internalMetadata/pageSinkProvider.
| ConnectorTableExecuteHandle tableExecuteHandle, | ||
| ConnectorPageSinkId pageSinkId) { | ||
| return pageSinkProvider.createPageSink( | ||
| GravitinoHandle.unWrap(transactionHandle), session, tableExecuteHandle, pageSinkId); |
There was a problem hiding this comment.
createPageSink(..., ConnectorTableExecuteHandle, ...) forwards tableExecuteHandle without unwrapping/wrapping. If the execute handle comes from an internal connector classloader, Trino may need it to be serialized across nodes, and this connector already wraps other internal handles for that reason. Consider adding a Gravitino wrapper for ConnectorTableExecuteHandle and unwrap it here before delegating (consistent with the insert/merge overloads).
| GravitinoHandle.unWrap(transactionHandle), session, tableExecuteHandle, pageSinkId); | |
| GravitinoHandle.unWrap(transactionHandle), | |
| session, | |
| GravitinoHandle.unWrap(tableExecuteHandle), | |
| pageSinkId); |
| return internalMetadata.getTableHandleForExecute( | ||
| session, GravitinoHandle.unWrap(tableHandle), procedureName, executeProperties, retryMode); | ||
| } | ||
|
|
||
| @Override | ||
| public void executeTableExecute( | ||
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | ||
| internalMetadata.executeTableExecute(session, tableExecuteHandle); |
There was a problem hiding this comment.
getTableHandleForExecute returns the internal connector’s ConnectorTableExecuteHandle directly. Since internal connectors are loaded via separate PluginClassLoaders, this handle may not be serializable/deserializable by Trino when distributed execution requires shipping the handle to workers. Wrap the returned execute handle in a Gravitino handle wrapper (and unwrap it when delegating) to match the existing pattern used for insert/merge handles.
| return internalMetadata.getTableHandleForExecute( | |
| session, GravitinoHandle.unWrap(tableHandle), procedureName, executeProperties, retryMode); | |
| } | |
| @Override | |
| public void executeTableExecute( | |
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | |
| internalMetadata.executeTableExecute(session, tableExecuteHandle); | |
| Optional<ConnectorTableExecuteHandle> internalHandle = | |
| internalMetadata.getTableHandleForExecute( | |
| session, | |
| GravitinoHandle.unWrap(tableHandle), | |
| procedureName, | |
| executeProperties, | |
| retryMode); | |
| if (!internalHandle.isPresent()) { | |
| return internalHandle; | |
| } | |
| SchemaTableName tableName = getTableName(tableHandle); | |
| return Optional.of( | |
| new GravitinoHandle( | |
| tableName.getSchemaName(), tableName.getTableName(), internalHandle.get())); | |
| } | |
| @Override | |
| public void executeTableExecute( | |
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | |
| internalMetadata.executeTableExecute( | |
| session, GravitinoHandle.unWrap(tableExecuteHandle)); |
| return internalMetadata.getTableHandleForExecute( | ||
| session, GravitinoHandle.unWrap(tableHandle), procedureName, executeProperties, retryMode); | ||
| } | ||
|
|
||
| @Override | ||
| public void executeTableExecute( | ||
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | ||
| internalMetadata.executeTableExecute(session, tableExecuteHandle); |
There was a problem hiding this comment.
getTableHandleForExecute returns the internal connector’s ConnectorTableExecuteHandle directly. If the internal connector is loaded in a separate plugin classloader, Trino may be unable to serialize/deserialize this handle during distributed execution of table procedures. Wrap it in a Gravitino handle wrapper and unwrap it before delegating, similar to the existing insert/merge handle pattern.
| return internalMetadata.getTableHandleForExecute( | |
| session, GravitinoHandle.unWrap(tableHandle), procedureName, executeProperties, retryMode); | |
| } | |
| @Override | |
| public void executeTableExecute( | |
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | |
| internalMetadata.executeTableExecute(session, tableExecuteHandle); | |
| Optional<ConnectorTableExecuteHandle> internalHandle = | |
| internalMetadata.getTableHandleForExecute( | |
| session, | |
| GravitinoHandle.unWrap(tableHandle), | |
| procedureName, | |
| executeProperties, | |
| retryMode); | |
| SchemaTableName tableName = getTableName(tableHandle); | |
| return internalHandle.map( | |
| handle -> | |
| new GravitinoTableExecuteHandle( | |
| tableName.getSchemaName(), tableName.getTableName(), handle)); | |
| } | |
| @Override | |
| public void executeTableExecute( | |
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | |
| internalMetadata.executeTableExecute(session, GravitinoHandle.unWrap(tableExecuteHandle)); |
| return internalMetadata.getTableHandleForExecute( | ||
| session, | ||
| accessControl, | ||
| GravitinoHandle.unWrap(tableHandle), | ||
| procedureName, | ||
| executeProperties, | ||
| retryMode); | ||
| } | ||
|
|
||
| @Override | ||
| public void executeTableExecute( | ||
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | ||
| internalMetadata.executeTableExecute(session, tableExecuteHandle); |
There was a problem hiding this comment.
getTableHandleForExecute returns the internal connector’s ConnectorTableExecuteHandle directly. Because the internal connector is loaded via a separate plugin classloader, this handle can fail to serialize/deserialize when Trino distributes the table execute operation. Wrap the returned execute handle in a Gravitino handle wrapper and unwrap it when delegating (consistent with the insert/merge handle wrappers).
| return internalMetadata.getTableHandleForExecute( | |
| session, | |
| accessControl, | |
| GravitinoHandle.unWrap(tableHandle), | |
| procedureName, | |
| executeProperties, | |
| retryMode); | |
| } | |
| @Override | |
| public void executeTableExecute( | |
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | |
| internalMetadata.executeTableExecute(session, tableExecuteHandle); | |
| SchemaTableName tableName = getTableName(tableHandle); | |
| return internalMetadata | |
| .getTableHandleForExecute( | |
| session, | |
| accessControl, | |
| GravitinoHandle.unWrap(tableHandle), | |
| procedureName, | |
| executeProperties, | |
| retryMode) | |
| .map( | |
| handle -> | |
| new GravitinoTableExecuteHandle( | |
| tableName.getSchemaName(), tableName.getTableName(), handle)); | |
| } | |
| @Override | |
| public void executeTableExecute( | |
| ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { | |
| internalMetadata.executeTableExecute( | |
| session, GravitinoHandle.unWrap(tableExecuteHandle)); |
| SELECT count(*) >= 3 FROM "gt_snapshot_test"."maintenance_table$snapshots"; | ||
|
|
||
| -- Test expire_snapshots procedure (expire snapshots older than a far future timestamp keeps all) | ||
| ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE expire_snapshots(retention_threshold => '0s'); |
There was a problem hiding this comment.
The comment says the expire_snapshots call uses “a far future timestamp keeps all”, but the statement uses retention_threshold => '0s' (which is effectively immediate and can expire snapshots older than “now”). Either adjust the comment to match the behavior being tested, or change the parameters so the SQL matches the stated intent.
| ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE expire_snapshots(retention_threshold => '0s'); | |
| ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE expire_snapshots(retention_threshold => '36500d'); |
What changes were proposed in this pull request?
Delegate Iceberg snapshot maintenance procedures (expire_snapshots, remove_orphan_files, rewrite_data_files/optimize, rewrite_manifests) from the Gravitino Trino Connector to the internal Iceberg connector.
Changes:
Why are the changes needed?
Fix #10280
Does this PR introduce any user-facing change?
No.
How was this patch tested?
All the tests passed